{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Deepstream Test4 description\n",
    "--------------------------------------------------------------------------------------\n",
    "This sample builds on top of the deepstream-test1 sample to demonstrate how to:\n",
    "\n",
    "* Use \"nvmsgconv\" and \"nvmsgbroker\" plugins in the pipeline.\n",
    "* Create NVDS_META_EVENT_MSG type of meta and attach to buffer.\n",
    "* Use NVDS_META_EVENT_MSG for different types of objects e.g. vehicle, person etc.\n",
    "* Provide copy / free functions if meta data is extended through \"extMsg\" field.\n",
    "\n",
    "\"nvmsgconv\" plugin uses NVDS_META_EVENT_MSG type of metadata from the buffer\n",
    "and generates the \"DeepStream Schema\" payload in Json format. Static properties\n",
    "of schema are read from configuration file in the form of key-value pair.\n",
    "Check dstest4_msgconv_config.txt for reference. Generated payload is attached\n",
    "as NVDS_META_PAYLOAD type metadata to the buffer.\n",
    "\n",
    "\"nvmsgbroker\" plugin extracts NVDS_META_PAYLOAD type of metadata from the buffer\n",
    "and sends that payload to the server using protocol adaptor APIs.\n",
    "\n",
    "Generating custom metadata for different type of objects:\n",
    "In addition to common fields provided in NvDsEventMsgMeta structure, user can\n",
    "also create custom objects and attach to buffer as NVDS_META_EVENT_MSG metadata.\n",
    "To do that NvDsEventMsgMeta provides \"extMsg\" and \"extMsgSize\" fields. User can\n",
    "create custom structure, fill that structure and assign the pointer of that\n",
    "structure as \"extMsg\" and set the \"extMsgSize\" accordingly.\n",
    "If custom object contains fields that can't be simply mem copied then user should\n",
    "also provide function to copy and free those objects."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Installing necessary libraries (kafka)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!sudo apt-get install libglib2.0 libglib2.0-dev\n",
    "!sudo apt-get install libjansson4  libjansson-dev\n",
    "!sudo apt-get install librdkafka1=0.11.3-1build1"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Preparing the necessary arguments \n",
    "\n",
    "cfg_file = Path to adaptor config file\n",
    "\n",
    "input_file = Path to input x264 stream\n",
    "\n",
    "proto_lib = Absolute path to adaptor library\n",
    "\n",
    "conn_str = Connection string of backend server. Optional if it is part of config file.\n",
    "\n",
    "topic = Name of message topic. Optional if it is part of connection string or config file.\n",
    "\n",
    "no_display = To disable display. Default False\n",
    "schema_type = Type of minimal schema. 0= Full. 1= Minimal. Default =0"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "cfg_file = \"cfg_kafka.txt\"\n",
    "input_file = \"\"\n",
    "proto_lib = \"/home/nvidia/Downloads/deepstream/sources/libs/kafka_protocol_adaptor/\"\n",
    "conn_str = \"host;port;topic\"\n",
    "topic = \"\"\n",
    "no_display = False\n",
    "schema_type = 0    "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Importing necessary libraries"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import sys\n",
    "sys.path.append('../')\n",
    "sys.path.append('/usr/lib/x86_64-linux-gnu/gstreamer-1.0/deepstream/')\n",
    "import gi\n",
    "gi.require_version('Gst', '1.0')\n",
    "from gi.repository import GObject, Gst\n",
    "from gi.repository import GLib\n",
    "import sys\n",
    "import platform\n",
    "from optparse import OptionParser\n",
    "from common.is_aarch_64 import is_aarch64\n",
    "from common.bus_call import bus_call\n",
    "from common.utils import long_to_int\n",
    "import pyds"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Declaring class label ids and other meta data requirements"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "MAX_DISPLAY_LEN=64\n",
    "MAX_TIME_STAMP_LEN=32\n",
    "PGIE_CLASS_ID_VEHICLE = 0\n",
    "PGIE_CLASS_ID_BICYCLE = 1\n",
    "PGIE_CLASS_ID_PERSON = 2\n",
    "PGIE_CLASS_ID_ROADSIGN = 3\n",
    "MUXER_OUTPUT_WIDTH=1920\n",
    "MUXER_OUTPUT_HEIGHT=1080\n",
    "MUXER_BATCH_TIMEOUT_USEC=4000000\n",
    "input_file = \"/home/nvidia/Downloads/deepstream/sources/python/apps/sample_720p.h264\"\n",
    "schema_type = 0\n",
    "frame_number = 0\n",
    "proto_lib = \"/home/nvidia/Downloads/deepstream/sources/libs/kafka_protocol_adaptor/\"\n",
    "conn_str=\"localhost;2181;testTopic\"\n",
    "cfg_file = \"cfg_kafka.txt\"\n",
    "topic = None\n",
    "no_display = False"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Necessary config files and meta data class labels"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "PGIE_CONFIG_FILE=\"dstest4_pgie_config.txt\"\n",
    "MSCONV_CONFIG_FILE=\"dstest4_msgconv_config.txt\"\n",
    "\n",
    "\n",
    "pgie_classes_str=[\"Vehicle\", \"TwoWheeler\", \"Person\",\"Roadsign\"]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Callback function for deep-copying an NvDsEventMsgMeta struct"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def meta_copy_func(data,user_data):\n",
    "    # Cast data to pyds.NvDsUserMeta\n",
    "    user_meta=pyds.glist_get_nvds_user_meta(data)\n",
    "    src_meta_data=user_meta.user_meta_data\n",
    "    # Cast src_meta_data to pyds.NvDsEventMsgMeta\n",
    "    srcmeta=pyds.glist_get_nvds_event_msg_meta(src_meta_data)\n",
    "    # Duplicate the memory contents of srcmeta to dstmeta\n",
    "    # First use pyds.get_ptr() to get the C address of srcmeta, then\n",
    "    # use pyds.memdup() to allocate dstmeta and copy srcmeta into it.\n",
    "    # pyds.memdup returns C address of the allocated duplicate.\n",
    "    dstmeta_ptr=pyds.memdup(pyds.get_ptr(srcmeta), sys.getsizeof(pyds.NvDsEventMsgMeta))\n",
    "    # Cast the duplicated memory to pyds.NvDsEventMsgMeta\n",
    "    dstmeta=pyds.glist_get_nvds_event_msg_meta(dstmeta_ptr)\n",
    "\n",
    "    # Duplicate contents of ts field. Note that reading srcmeat.ts\n",
    "    # returns its C address. This allows to memory operations to be\n",
    "    # performed on it.\n",
    "    dstmeta.ts=pyds.memdup(srcmeta.ts, MAX_TIME_STAMP_LEN+1)\n",
    "\n",
    "    # Copy the sensorStr. This field is a string property.\n",
    "    # The getter (read) returns its C address. The setter (write)\n",
    "    # takes string as input, allocates a string buffer and copies\n",
    "    # the input string into it.\n",
    "    # pyds.get_string() takes C address of a string and returns\n",
    "    # the reference to a string object and the assignment inside the binder copies content.\n",
    "    dstmeta.sensorStr=pyds.get_string(srcmeta.sensorStr)\n",
    "\n",
    "    if(srcmeta.objSignature.size>0):\n",
    "        dstmeta.objSignature.signature=pyds.memdup(srcmeta.objSignature.signature,srcMeta.objSignature.size)\n",
    "        dstmeta.objSignature.size = srcmeta.objSignature.size;\n",
    "\n",
    "    if(srcmeta.extMsgSize>0):\n",
    "        if(srcmeta.objType==pyds.NvDsObjectType.NVDS_OBJECT_TYPE_VEHICLE):\n",
    "            srcobj = pyds.glist_get_nvds_vehicle_object(srcmeta.extMsg);\n",
    "            obj = pyds.alloc_nvds_vehicle_object();\n",
    "            obj.type=pyds.get_string(srcobj.type)\n",
    "            obj.make=pyds.get_string(srcobj.make)\n",
    "            obj.model=pyds.get_string(srcobj.model)\n",
    "            obj.color=pyds.get_string(srcobj.color)\n",
    "            obj.license = pyds.get_string(srcobj.license)\n",
    "            obj.region = pyds.get_string(srcobj.region)\n",
    "            dstmeta.extMsg = obj;\n",
    "            dstmeta.extMsgSize = sys.getsizeof(pyds.NvDsVehicleObject)\n",
    "        if(srcmeta.objType==pyds.NvDsObjectType.NVDS_OBJECT_TYPE_PERSON):\n",
    "            srcobj = pyds.glist_get_nvds_person_object(srcmeta.extMsg);\n",
    "            obj = pyds.alloc_nvds_person_object()\n",
    "            obj.age = srcobj.age\n",
    "            obj.gender = pyds.get_string(srcobj.gender);\n",
    "            obj.cap = pyds.get_string(srcobj.cap)\n",
    "            obj.hair = pyds.get_string(srcobj.hair)\n",
    "            obj.apparel = pyds.get_string(srcobj.apparel);\n",
    "            dstmeta.extMsg = obj;\n",
    "            dstmeta.extMsgSize = sys.getsizeof(pyds.NvDsVehicleObject);\n",
    "\n",
    "    return dstmeta"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Callback function for freeing an NvDsEventMsgMeta instance"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [],
   "source": [
    "def meta_free_func(data,user_data):\n",
    "    user_meta=pyds.glist_get_nvds_user_meta(data)\n",
    "    srcmeta=pyds.glist_get_nvds_event_msg_meta(user_meta.user_meta_data)\n",
    "\n",
    "    # pyds.free_buffer takes C address of a buffer and frees the memory\n",
    "    # It's a NOP if the address is NULL\n",
    "    pyds.free_buffer(srcmeta.ts)\n",
    "    pyds.free_buffer(srcmeta.sensorStr)\n",
    "\n",
    "    if(srcmeta.objSignature.size > 0):\n",
    "        pyds.free_buffer(srcmeta.objSignature.signature);\n",
    "        srcmeta.objSignature.size = 0\n",
    "\n",
    "    if(srcmeta.extMsgSize > 0):\n",
    "        if(srcmeta.objType == pyds.NvDsObjectType.NVDS_OBJECT_TYPE_VEHICLE):\n",
    "            obj =pyds.glist_get_nvds_vehicle_object(srcmeta.extMsg)\n",
    "            pyds.free_buffer(obj.type);\n",
    "            pyds.free_buffer(obj.color);\n",
    "            pyds.free_buffer(obj.make);\n",
    "            pyds.free_buffer(obj.model);\n",
    "            pyds.free_buffer(obj.license);\n",
    "            pyds.free_buffer(obj.region);\n",
    "        if(srcmeta.objType == pyds.NvDsObjectType.NVDS_OBJECT_TYPE_PERSON):\n",
    "            obj = pyds.glist_get_nvds_person_object(srcmeta.extMsg);\n",
    "            pyds.free_buffer(obj.gender);\n",
    "            pyds.free_buffer(obj.cap);\n",
    "            pyds.free_buffer(obj.hair);\n",
    "            pyds.free_buffer(obj.apparel);\n",
    "        pyds.free_gbuffer(srcmeta.extMsg);\n",
    "        srcmeta.extMsgSize = 0;"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Generating meta data "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def generate_vehicle_meta(data):\n",
    "    obj = pyds.glist_get_nvds_vehicle_object(data);\n",
    "    obj.type =\"sedan\"\n",
    "    obj.color=\"blue\"\n",
    "    obj.make =\"Bugatti\"\n",
    "    obj.model = \"M\"\n",
    "    obj.license =\"XX1234\"\n",
    "    obj.region =\"CA\"\n",
    "    return obj\n",
    "\n",
    "def generate_person_meta(data):\n",
    "    obj = pyds.glist_get_nvds_person_object(data)\n",
    "    obj.age = 45\n",
    "    obj.cap = \"none\"\n",
    "    obj.hair = \"black\"\n",
    "    obj.gender = \"male\"\n",
    "    obj.apparel= \"formal\"\n",
    "    return obj"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Generating event meta data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def generate_event_msg_meta(data, class_id):\n",
    "    meta =pyds.glist_get_nvds_event_msg_meta(data)\n",
    "    meta.sensorId = 0\n",
    "    meta.placeId = 0\n",
    "    meta.moduleId = 0\n",
    "    meta.sensorStr = \"sensor-0\"\n",
    "    meta.ts = pyds.alloc_buffer(MAX_TIME_STAMP_LEN + 1)\n",
    "    pyds.generate_ts_rfc3339(meta.ts, MAX_TIME_STAMP_LEN)\n",
    "\n",
    "    # This demonstrates how to attach custom objects.\n",
    "    # Any custom object as per requirement can be generated and attached\n",
    "    # like NvDsVehicleObject / NvDsPersonObject. Then that object should\n",
    "    # be handled in payload generator library (nvmsgconv.cpp) accordingly.\n",
    "    if(class_id==PGIE_CLASS_ID_VEHICLE):\n",
    "        meta.type = pyds.NvDsEventType.NVDS_EVENT_MOVING\n",
    "        meta.objType = pyds.NvDsObjectType.NVDS_OBJECT_TYPE_VEHICLE\n",
    "        meta.objClassId = PGIE_CLASS_ID_VEHICLE\n",
    "        obj = pyds.alloc_nvds_vehicle_object()\n",
    "        obj = generate_vehicle_meta(obj)\n",
    "        meta.extMsg = obj\n",
    "        meta.extMsgSize = sys.getsizeof(pyds.NvDsVehicleObject);\n",
    "    if(class_id == PGIE_CLASS_ID_PERSON):\n",
    "        meta.type =pyds.NvDsEventType.NVDS_EVENT_ENTRY\n",
    "        meta.objType = pyds.NvDsObjectType.NVDS_OBJECT_TYPE_PERSON;\n",
    "        meta.objClassId = PGIE_CLASS_ID_PERSON\n",
    "        obj = pyds.alloc_nvds_person_object()\n",
    "        obj=generate_person_meta(obj)\n",
    "        meta.extMsg = obj\n",
    "        meta.extMsgSize = sys.getsizeof(pyds.NvDsPersonObject)\n",
    "    return meta"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Adding a probe to get meta data"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## osd_sink_pad_buffer_probe  will extract metadata received on OSD sink pad\n",
    "## and update params for drawing rectangle, object information etc.\n",
    "\n",
    "## IMPORTANT NOTE:\n",
    "#### a) probe() callbacks are synchronous and thus holds the buffer\n",
    "####      (info.get_buffer()) from traversing the pipeline until user return.\n",
    "#### b) loops inside probe() callback could be costly in python.\n",
    "####   So users shall optimize according to their use-case."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def osd_sink_pad_buffer_probe(pad,info,u_data):\n",
    "    frame_number=0\n",
    "    #Intiallizing object counter with 0.\n",
    "    obj_counter = {\n",
    "        PGIE_CLASS_ID_VEHICLE:0,\n",
    "        PGIE_CLASS_ID_PERSON:0,\n",
    "        PGIE_CLASS_ID_BICYCLE:0,\n",
    "        PGIE_CLASS_ID_ROADSIGN:0\n",
    "    }\n",
    "    is_first_object=True\n",
    "    gst_buffer = info.get_buffer()\n",
    "    if not gst_buffer:\n",
    "        print(\"Unable to get GstBuffer \")\n",
    "        return\n",
    "\n",
    "    # Retrieve batch metadata from the gst_buffer\n",
    "    # Note that pyds.gst_buffer_get_nvds_batch_meta() expects the\n",
    "    # C address of gst_buffer as input, which is obtained with hash(gst_buffer)\n",
    "    batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))\n",
    "    if not batch_meta:\n",
    "        return Gst.PadProbeReturn.OK\n",
    "    l_frame = batch_meta.frame_meta_list\n",
    "    while l_frame is not None:\n",
    "        try:\n",
    "            # Note that l_frame.data needs a cast to pyds.NvDsFrameMeta\n",
    "            # The casting is done by pyds.glist_get_nvds_frame_meta()\n",
    "            # The casting also keeps ownership of the underlying memory\n",
    "            # in the C code, so the Python garbage collector will leave\n",
    "            # it alone.\n",
    "            frame_meta = pyds.glist_get_nvds_frame_meta(l_frame.data)\n",
    "        except StopIteration:\n",
    "            continue\n",
    "        is_first_object = True;\n",
    "\n",
    "        '''\n",
    "        print(\"Frame Number is \", frame_meta.frame_num)\n",
    "        print(\"Source id is \", frame_meta.source_id)\n",
    "        print(\"Batch id is \", frame_meta.batch_id)\n",
    "        print(\"Source Frame Width \", frame_meta.source_frame_width)\n",
    "        print(\"Source Frame Height \", frame_meta.source_frame_height)\n",
    "        print(\"Num object meta \", frame_meta.num_obj_meta)\n",
    "        '''\n",
    "        frame_number=frame_meta.frame_num\n",
    "        l_obj=frame_meta.obj_meta_list\n",
    "        while l_obj is not None:\n",
    "            try:\n",
    "                obj_meta=pyds.glist_get_nvds_object_meta(l_obj.data)\n",
    "            except StopIteration:\n",
    "                continue\n",
    "\n",
    "            # Update the object text display\n",
    "            txt_params=obj_meta.text_params\n",
    "            if(txt_params.display_text):\n",
    "                pyds.free_buffer(txt_params.display_text)\n",
    "\n",
    "            txt_params.display_text = pgie_classes_str[obj_meta.class_id]\n",
    "\n",
    "            obj_counter[obj_meta.class_id] += 1\n",
    "\n",
    "            # Font , font-color and font-size\n",
    "            txt_params.font_params.font_name = \"Serif\"\n",
    "            txt_params.font_params.font_size = 10\n",
    "            # set(red, green, blue, alpha); set to White\n",
    "            txt_params.font_params.font_color.set(1.0, 1.0, 1.0, 1.0);\n",
    "\n",
    "            # Text background color\n",
    "            txt_params.set_bg_clr = 1\n",
    "            # set(red, green, blue, alpha); set to Black\n",
    "            txt_params.text_bg_clr.set(0.0, 0.0, 0.0, 1.0);\n",
    "\n",
    "            # Ideally NVDS_EVENT_MSG_META should be attached to buffer by the\n",
    "            # component implementing detection / recognition logic.\n",
    "            # Here it demonstrates how to use / attach that meta data.\n",
    "            if(is_first_object and not (frame_number%30)):\n",
    "                # Frequency of messages to be send will be based on use case.\n",
    "                # Here message is being sent for first object every 30 frames.\n",
    "\n",
    "                # Allocating an NvDsEventMsgMeta instance and getting reference\n",
    "                # to it. The underlying memory is not manged by Python so that\n",
    "                # downstream plugins can access it. Otherwise the garbage collector\n",
    "                # will free it when this probe exits.\n",
    "                msg_meta=pyds.alloc_nvds_event_msg_meta()\n",
    "                msg_meta.bbox.top =  obj_meta.rect_params.top\n",
    "                msg_meta.bbox.left =  obj_meta.rect_params.left\n",
    "                msg_meta.bbox.width = obj_meta.rect_params.width\n",
    "                msg_meta.bbox.height = obj_meta.rect_params.height\n",
    "                msg_meta.frameId = frame_number\n",
    "                msg_meta.trackingId = long_to_int(obj_meta.object_id)\n",
    "                msg_meta.confidence = obj_meta.confidence\n",
    "                msg_meta = generate_event_msg_meta(msg_meta, obj_meta.class_id)\n",
    "                user_event_meta = pyds.nvds_acquire_user_meta_from_pool(batch_meta)\n",
    "                if(user_event_meta):\n",
    "                    user_event_meta.user_meta_data = msg_meta;\n",
    "                    user_event_meta.base_meta.meta_type = pyds.NvDsMetaType.NVDS_EVENT_MSG_META\n",
    "                    # Setting callbacks in the event msg meta. The bindings layer\n",
    "                    # will wrap these callables in C functions. Currently only one\n",
    "                    # set of callbacks is supported.\n",
    "                    pyds.set_user_copyfunc(user_event_meta, meta_copy_func)\n",
    "                    pyds.set_user_releasefunc(user_event_meta, meta_free_func)\n",
    "                    pyds.nvds_add_user_meta_to_frame(frame_meta, user_event_meta)\n",
    "                else:\n",
    "                    print(\"Error in attaching event meta to buffer\\n\")\n",
    "\n",
    "                is_first_object = False\n",
    "            try:\n",
    "                l_obj=l_obj.next\n",
    "            except StopIteration:\n",
    "                break\n",
    "        try:\n",
    "            l_frame=l_frame.next\n",
    "        except StopIteration:\n",
    "            break\n",
    "\n",
    "    print(\"Frame Number =\",frame_number,\"Vehicle Count =\",obj_counter[PGIE_CLASS_ID_VEHICLE],\"Person Count =\",obj_counter[PGIE_CLASS_ID_PERSON])\n",
    "    return Gst.PadProbeReturn.OK"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Initializing GStreamer"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "GObject.threads_init()\n",
    "Gst.init(None)\n",
    "\n",
    "print(\"Creating Pipeline \\n \")\n",
    "\n",
    "pipeline = Gst.Pipeline()\n",
    "\n",
    "if not pipeline:\n",
    "    sys.stderr.write(\" Unable to create Pipeline \\n\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Creating different elements of the stream"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Creating a source element for reading from a file"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(\"Creating Source \\n \")\n",
    "source = Gst.ElementFactory.make(\"filesrc\", \"file-source\")\n",
    "if not source:\n",
    "    sys.stderr.write(\" Unable to create Source \\n\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Creating a h264 parser as the input file is an elementary h264 stream"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(\"Creating H264Parser \\n\")\n",
    "h264parser = Gst.ElementFactory.make(\"h264parse\", \"h264-parser\")\n",
    "if not h264parser:\n",
    "    sys.stderr.write(\" Unable to create h264 parser \\n\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Using nvdec_h264 for accelerated decoding on GPU"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(\"Creating Decoder \\n\")\n",
    "decoder = Gst.ElementFactory.make(\"nvv4l2decoder\", \"nvv4l2-decoder\")\n",
    "if not decoder:\n",
    "    sys.stderr.write(\" Unable to create Nvv4l2 Decoder \\n\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Creating a nvstreammux instance to form batches for one or more sources"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "streammux = Gst.ElementFactory.make(\"nvstreammux\", \"Stream-muxer\")\n",
    "if not streammux:\n",
    "    sys.stderr.write(\" Unable to create NvStreamMux \\n\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Setting up nvinfer to run inference on the decoders output\n",
    "### Note: Behaviour of inference is set through the config file "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "pgie = Gst.ElementFactory.make(\"nvinfer\", \"primary-inference\")\n",
    "if not pgie:\n",
    "    sys.stderr.write(\" Unable to create pgie \\n\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Using a converter to convert from NV12 to RGBA as required by nvosd"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "nvvidconv = Gst.ElementFactory.make(\"nvvideoconvert\", \"convertor\")\n",
    "if not nvvidconv:\n",
    "    sys.stderr.write(\" Unable to create nvvidconv \\n\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create OSD to draw on the converted RGBA buffer"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "nvosd = Gst.ElementFactory.make(\"nvdsosd\", \"onscreendisplay\")\n",
    "if not nvosd:\n",
    "    sys.stderr.write(\" Unable to create nvosd \\n\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create a message converter"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "msgconv=Gst.ElementFactory.make(\"nvmsgconv\", \"nvmsg-converter\")\n",
    "if not msgconv:\n",
    "    sys.stderr.write(\" Unable to create msgconv \\n\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create a message broker"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "msgbroker=Gst.ElementFactory.make(\"nvmsgbroker\", \"nvmsg-broker\")\n",
    "if not msgbroker:\n",
    "    sys.stderr.write(\" Unable to create msgbroker \\n\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create a tee "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "tee=Gst.ElementFactory.make(\"tee\", \"nvsink-tee\")\n",
    "if not tee:\n",
    "    sys.stderr.write(\" Unable to create tee \\n\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Creating queues for tee"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "queue1=Gst.ElementFactory.make(\"queue\", \"nvtee-que1\")\n",
    "if not queue1:\n",
    "    sys.stderr.write(\" Unable to create queue1 \\n\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "queue2=Gst.ElementFactory.make(\"queue\", \"nvtee-que2\")\n",
    "if not queue2:\n",
    "    sys.stderr.write(\" Unable to create queue2 \\n\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Creating sink as required based on if display is required"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "if (no_display) :\n",
    "    print(\"Creating FakeSink \\n\")\n",
    "    sink = Gst.ElementFactory.make(\"fakesink\", \"fakesink\")\n",
    "    if not sink:\n",
    "        sys.stderr.write(\" Unable to create fakesink \\n\")\n",
    "else:\n",
    "    if is_aarch64():\n",
    "        transform = Gst.ElementFactory.make(\"nvegltransform\", \"nvegl-transform\")\n",
    "\n",
    "    print(\"Creating EGLSink \\n\")\n",
    "    sink = Gst.ElementFactory.make(\"nveglglessink\", \"nvvideo-renderer\")\n",
    "    if not sink:\n",
    "        sys.stderr.write(\" Unable to create egl sink \\n\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Setting PGIE and Streammux properties\n",
    "### Suggested reading: dstest1_pgie_config.txt "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(\"Playing file %s \" %input_file)\n",
    "source.set_property('location', input_file)\n",
    "streammux.set_property('width', 1920)\n",
    "streammux.set_property('height', 1080)\n",
    "streammux.set_property('batch-size', 1)\n",
    "streammux.set_property('batched-push-timeout', 4000000)\n",
    "pgie.set_property('config-file-path', PGIE_CONFIG_FILE)\n",
    "msgconv.set_property('config',MSCONV_CONFIG_FILE)\n",
    "msgconv.set_property('payload-type', schema_type)\n",
    "msgbroker.set_property('proto-lib', proto_lib)\n",
    "msgbroker.set_property('conn-str', conn_str)\n",
    "msgbroker.set_property('config', cfg_file)\n",
    "if topic is not None:\n",
    "    msgbroker.set_property('topic', topic)\n",
    "msgbroker.set_property('sync', False)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Adding elements to pipeline"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(\"Adding elements to Pipeline \\n\")\n",
    "pipeline.add(source)\n",
    "pipeline.add(h264parser)\n",
    "pipeline.add(decoder)\n",
    "pipeline.add(streammux)\n",
    "pipeline.add(pgie)\n",
    "pipeline.add(nvvidconv)\n",
    "pipeline.add(nvosd)\n",
    "pipeline.add(tee)\n",
    "pipeline.add(queue1)\n",
    "pipeline.add(queue2)\n",
    "pipeline.add(msgconv)\n",
    "pipeline.add(msgbroker)\n",
    "pipeline.add(sink)\n",
    "if is_aarch64() and not no_display:\n",
    "    pipeline.add(transform)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Linking elements of pipline"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(\"Linking elements in the Pipeline \\n\")\n",
    "source.link(h264parser)\n",
    "h264parser.link(decoder)\n",
    "\n",
    "sinkpad = streammux.get_request_pad(\"sink_0\")\n",
    "if not sinkpad:\n",
    "    sys.stderr.write(\" Unable to get the sink pad of streammux \\n\")\n",
    "srcpad = decoder.get_static_pad(\"src\")\n",
    "if not srcpad:\n",
    "    sys.stderr.write(\" Unable to get source pad of decoder \\n\")\n",
    "srcpad.link(sinkpad)\n",
    "\n",
    "streammux.link(pgie)\n",
    "pgie.link(nvvidconv)\n",
    "nvvidconv.link(nvosd)\n",
    "nvosd.link(tee)\n",
    "queue1.link(msgconv)\n",
    "msgconv.link(msgbroker)\n",
    "if is_aarch64() and not no_display:\n",
    "    queue2.link(transform)\n",
    "    transform.link(sink)\n",
    "else:\n",
    "    queue2.link(sink)\n",
    "sink_pad=queue1.get_static_pad(\"sink\")\n",
    "tee_msg_pad=tee.get_request_pad('src_%u')\n",
    "tee_render_pad=tee.get_request_pad(\"src_%u\")\n",
    "if not tee_msg_pad or not tee_render_pad:\n",
    "    sys.stderr.write(\"Unable to get request pads\\n\")\n",
    "tee_msg_pad.link(sink_pad)\n",
    "sink_pad=queue2.get_static_pad(\"sink\")\n",
    "tee_render_pad.link(sink_pad)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Creating an event loop and feeding gstream bus messages to it"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "loop = GObject.MainLoop()\n",
    "bus = pipeline.get_bus()\n",
    "bus.add_signal_watch()\n",
    "bus.connect (\"message\", bus_call, loop)\n",
    "\n",
    "osdsinkpad = nvosd.get_static_pad(\"sink\")\n",
    "if not osdsinkpad:\n",
    "    sys.stderr.write(\" Unable to get sink pad of nvosd \\n\")\n",
    "\n",
    "osdsinkpad.add_probe(Gst.PadProbeType.BUFFER, osd_sink_pad_buffer_probe, 0)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Starting the pipeline"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(\"Starting pipeline \\n\")\n",
    "\n",
    "# start play back and listed to events\n",
    "pipeline.set_state(Gst.State.PLAYING)\n",
    "try:\n",
    "    loop.run()\n",
    "except:\n",
    "    pass"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Cleaning up post successful run"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "pyds.unset_callback_funcs()\n",
    "pipeline.set_state(Gst.State.NULL)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.9"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
